大数据Hive多字节分隔符

您所在的位置:网站首页 hive textfile 分隔符 大数据Hive多字节分隔符

大数据Hive多字节分隔符

2024-02-10 18:48| 来源: 网络整理| 查看: 265

目录 1 应用场景1.1 Hive中的分隔符1.2 特殊数据2.2 需求 3 解决方案一:替换分隔符3.1 方案概述3.2 程序开发3.3 重新建表加载数据3.4 查看结果3.5 总结 4 解决方案二:RegexSerDe正则加载4.1 方案概述4.2 什么是SerDe?4.3 Hive中包含的SerDe4.4 RegexSerDe的功能4.5 RegexSerDe解决多字节分隔符4.6 RegexSerDe解决数据中包含分割符4.7 总结 5 解决方案三:自定义InputFormat5.1 方案概述5.2 自定义InputFormat5.3 基于自定义Input创建表5.4 查看结果 6 总结

1 应用场景 1.1 Hive中的分隔符

Hive中默认使用单字节分隔符来加载文本数据,例如逗号、制表符、空格等等,默认的分隔符为\001。根据不同文件的不同分隔符,我们可以通过在创建表时使用 row format delimited fields terminated by ‘单字节分隔符’ 来指定文件中的分割符,确保正确将表中的每一列与文件中的每一列实现一一对应的关系。在这里插入图片描述

在这里插入图片描述

1.2 特殊数据

在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如我们会遇到以下的两种情况 在这里插入图片描述 ➢ 上图中每列的分隔符为||,为多字节分隔符 ➢ 情况二:数据的字段中包含了分隔符 在这里插入图片描述 ➢ 上图中每列的分隔符为空格,但是数据中包含了分割符,时间字段中也有空格 192.168.88.134 [08/Nov/2020:10:44:32 +0800] “GET / HTTP/1.1” 404 951 2.2 问题与需求 2.2.1 问题 基于上述的两种特殊数据,我们如果使用正常的加载数据的方式将数据加载到表中,就会出以下两种错误: ➢ 情况一:加载数据的分隔符为多字节分隔符 ➢ 创建表 –如果表已存在就删除表

drop table if exists singer; --创建表 create table singer( id string,--歌手id name string,--歌手名称 country string,--国家 province string,--省份 gender string,--性别 works string--作品 ) --指定列的分隔符为|| row format delimited fields terminated by '||';

➢ 加载数据 load data local inpath ‘/export/data/test01.txt’ into table singer;

➢ 查看结果 select * from singer;

在这里插入图片描述 ➢ 问题 数据发生了错位,没有正确的加载每一列的数据 ➢ 原因 Hive中默认只支持单字节分隔符,无法识别多字节分隔符

➢ 情况二:数据中包含了分隔符 ➢ 创建表

--如果表存在,就删除表 drop table if exists apachelog; --创建表 create table apachelog( ip string, --IP地址 stime string, --时间 mothed string, --请求方式 url string, --请求地址 policy string, --请求协议 stat string, --请求状态 body string --字节大小 ) --指定列的分隔符为空格 row format delimited fields terminated by ' ';

➢ 加载数据 load data local inpath ‘/export/data/apache_web_access.log’ into table apachelog;

➢ 查看结果 select * from apachelog; 在这里插入图片描述 ➢ 问题 时间字段被切分成了两个字段,后面所有的字段出现了错位 ➢ 原因 时间数据中包含了分隔符,导致Hive认为这是两个字段,但实际业务需求中,为一个字段

2.2 需求

基于上面两种情况的测试发现,当数据中出现了多字节分隔符或者数据中的某个字段包含了分隔符,就会导致数据加载错位的问题。基于出现的问题,我们需要通过特殊的方法来解决该问题,即使当数据中出现多字节分隔符等情况时,Hive也能正确的加载数据,实现列与数据的一一对应。

3 解决方案一:替换分隔符 3.1 方案概述

面对情况一,如果数据中的分隔符是多字节分隔符,可以使用程序提前将数据中的多字节分隔符替换为单字节分隔符,然后使用Hive加载,就可以实现正确加载对应的数据。 例如:原始数据中的分隔符为“||” 在这里插入图片描述

3.2 程序开发

可以在ETL阶段通过一个MapReduce程序,将“||”替换为单字节的分隔符“|”,示例程序如下:

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName ChangeSplitCharMR * @Description TODO MapReduce实现将多字节分隔符转换为单字节符 * @Create By itcast */ public class ChangeSplitCharMR extends Configured implements Tool { public int run(String[] arg) throws Exception { /** * 构建Job */ Job job = Job.getInstance(this.getConf(),"changeSplit"); job.setJarByClass(ChangeSplitCharMR.class); /** * 配置Job */ //input:读取需要转换的文件 job.setInputFormatClass(TextInputFormat.class); Path inputPath = new Path("datas/split/test01.txt"); FileInputFormat.setInputPaths(job,inputPath); //map:调用Mapper job.setMapperClass(ChangeSplitMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //reduce:不需要Reduce过程 job.setNumReduceTasks(0); //output job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path("datas/output/changeSplit"); TextOutputFormat.setOutputPath(job,outputPath); /** * 提交Job */ return job.waitForCompletion(true) ? 0 : -1; } //程序入口 public static void main(String[] args) throws Exception { //调用run Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new ChangeSplitCharMR(), args); System.exit(status); } public static class ChangeSplitMapper extends Mapper{ //定义输出的Key private Text outputKey = new Text(); //定义输出的Value private NullWritable outputValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取每条数据 String line = value.toString(); //将里面的||转换为| String newLine = line.replaceAll("\\|\\|", "|"); //替换后的内容作为Key this.outputKey.set(newLine); //输出结果 context.write(this.outputKey,this.outputValue); } } }

➢ 程序执行结果如下: 在这里插入图片描述

3.3 重新建表加载数据

➢ 重新创建Hive表

--如果表已存在就删除表 drop table if exists singer; --创建表 create table singer( id string,--歌手id name string,--歌手名称 country string,--国家 province string,--省份 gender string,--性别 works string--作品 ) --指定列的分隔符为|| row format delimited fields terminated by '|';

➢ 在Hive中重新加载数据 load data local inpath ‘/export/data/part-m-00000’ into table singer;

3.4 查看结果

➢ 查看结果 在这里插入图片描述

3.5 总结

在ETL阶段可以直接对数据进行分隔符的替换,通过替换分隔符将多字节分隔符更改为单字节分隔符,就可以解决数据加载的问题,但是这种方式有对应的优缺点,并不是所有的场景适用于该方法。 优点:实现方式较为简单,基于字符串替换即可 缺点:无法满足情况2的需求

4 解决方案二:RegexSerDe正则加载 4.1 方案概述

面对情况一和情况二的问题,Hive中提供了一种特殊的方式来解决,Hive提供了一种特殊的Serde来加载特殊数据的问题,使用正则匹配来加载数据,匹配每一列的数据。 官网地址:https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-ApacheWeblogData 在这里插入图片描述

4.2 什么是SerDe?

Hive的SerDe提供了序列化和反序列化两个功能,SerDe是英文Serialize和Deserilize的组合缩写,用于实现将Hive中的对象进行序列化和将数据进行反序列化。 Serialize就是序列化,用于将Hive中使用的java object转换成能写入hdfs的字节序列,或者其他系统能识别的流文件。Hive中的insert语句用于将数据写入HDFS,所以就会调用序列化实现。Hive中的调用过程如下: 在这里插入图片描述 Deserilize就是反序列化,用于将字符串或者二进制数据流转换成Hive能识别的java object对 象。所有Hive中的Select语句在查询数据时,需要将HDFS中的数据解析为Hive中对象,就需要进行 反序列化。Hive可以方便的将数据加载到表中而不需要对数据进行转换,这样在处理海量数据时可 以节省大量的时间。Hive中的调用过程如下:在这里插入图片描述

4.3 Hive中包含的SerDe

官网地址:https://cwiki.apache.org/confluence/display/Hive/SerDe 在这里插入图片描述 Hive中默认提供了多种SerDe用于解析和加载不同类型的数据文件,常用的有ORCSerde 、RegexSerde、JsonSerDe等。

4.4 RegexSerDe的功能

RegexSerde是Hive中专门为了满足复杂数据场景所提供的正则加载和解析数据的接口,使用RegexSerde可以指定正则表达式加载数据,根据正则表达式匹配每一列数据。上述过程中遇到的情况一和情况二的问题,都可以通过RegexSerDe使用正则表达式来加载实现。

4.5 RegexSerDe解决多字节分隔符

➢ 分析数据格式,构建正则表达式 ➢ 原始数据格式 01||周杰伦||中国||台湾||男||七里香

➢ 正则表达式定义每一列 ([0-9])\|\|(.)\|\|(.)\|\|(.)\|\|(.)\|\|(.)

➢ 正则校验 在这里插入图片描述 ➢ 基于正则表达式,使用RegexSerde建表 –如果表已存在就删除表

drop table if exists singer; --创建表 create table singer( id string,--歌手id name string,--歌手名称 country string,--国家 province string,--省份 gender string,--性别 works string--作品 ) --指定使用RegexSerde加载数据 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' --指定正则表达式 WITH SERDEPROPERTIES ( "input.regex" = "([0-9]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)" );

➢ 加载数据 load data local inpath ‘/export/data/test01.txt’ into table singer;

➢ 查看数据结果 select * from singer; 在这里插入图片描述 每一列的数据都被正常的加载,没有错位

4.6 RegexSerDe解决数据中包含分割符

➢ 分析数据格式,构建正则表达式 ➢ 原始数据格式 192.168.88.100 [08/Nov/2020:10:44:33 +0800] “GET /hpsk_sdk/index.html HTTP/1.1” 200 328

➢ 正则表达式定义每一列 ([^ ]) ([^}]) ([^ ]) ([^ ]) ([^ ]) ([0-9]) ([^ ]*)

➢ 正则校验 在这里插入图片描述 ➢ 基于正则表达式,使用RegexSerde建表 –如果表存在,就删除表 drop table if exists apachelog; –创建表 create table apachelog( ip string, --IP地址 stime string, --时间 mothed string, --请求方式 url string, --请求地址 policy string, --请求协议 stat string, --请求状态 body string --字节大小 ) –指定使用RegexSerde加载数据 ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.RegexSerDe’ –指定正则表达式 WITH SERDEPROPERTIES ( “input.regex” = “([^ ]) ([^}]) ([^ ]) ([^ ]) ([^ ]) ([0-9]) ([^ ]*)” );

➢ 加载数据 load data local inpath ‘/export/data/apache_web_access.log’ into table apachelog; ➢ 查看数据结果 select ip,stime,url,stat,body from apachelog; 在这里插入图片描述

4.7 总结 RegexSerde使用简单,对于各种复杂的数据场景,都可以通过正则定义匹配每行中的每个字段,基本上可以满足大多数场景的需求,工作中推荐使用该方式来实现对于复杂数据的加载。 5 解决方案三:自定义InputFormat 5.1 方案概述

Hive中也允许使用自定义InputFormat来解决以上问题,通过在自定义InputFormat,来自定义解析逻辑实现读取每一行的数据。

5.2 自定义InputFormat

➢ 自定义InputFormat继承自TextInputFormat,读取数据时将每条数据中的”||”全部替换成“|” ➢ 自定义InputFormat

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import java.io.IOException; /** * @ClassName UserInputFormat * @Description TODO 用于实现自定义InputFormat,读取每行数据 * @Create By Itcast */ public class UserInputFormat extends TextInputFormat { @Override public RecordReader getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); UserRecordReader reader = new UserRecordReader(job,(FileSplit)genericSplit); return reader; } }

➢ 自定义RecordReader

import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import java.io.IOException; import java.io.InputStream; /** * @ClassName UserRecordReader * @Description TODO 用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为| * @Create By Itcast */ public class UserRecordReader implements RecordReader { private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName()); int maxLineLength; private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private Seekable filePosition; private CompressionCodec codec; private Decompressor decompressor; public UserRecordReader(Configuration job, FileSplit split) throws IOException { this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec) .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new LineReader(cIn, job); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { in = new LineReader(codec.createInputStream(fileIn, decompressor), job); filePosition = fileIn; } } else { fileIn.seek(start); in = new LineReader(fileIn, job); filePosition = fileIn; } if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } private boolean isCompressedInput() { return (codec != null); } private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput() && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } public LongWritable createKey() { return new LongWritable(); } public Text createValue() { return new Text(); } /** * Read a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { while (getFilePosition()


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3